package com.atguigu.gmall.realtime.app.func;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.beans.TableProcess;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;

/**
 * Author: Felix
 * Date: 2021/12/26
 * Desc: 实现业务数据的动态分流
 * 1.读取配置流中的信息，封装为TableProcess对象，放到广播状态中
 * 2.从状态中读取配置信息，当处理主流数据的时候，判断是维护还是事实
 * 3.提前创建维度表
 * 4.过滤数据
 */
public class TableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private OutputTag<JSONObject> dimTag;

    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;

    private Connection conn;

    public TableProcessFunction(OutputTag<JSONObject> dimTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.dimTag = dimTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //注册驱动
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        //获取连接对象
        conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }

    //处理主流中的业务数据的
    @Override
    public void processElement(JSONObject jsonObj, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //获取广播状态
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);

        String tableName = jsonObj.getString("table");
        String type = jsonObj.getString("type");

        // 注意：如果使用Maxwell的bootstrap同步历史数据的话，那么type为bootstrap-insert，我们这里做一个统一 都设置为insert
        if (type.equals("bootstrap-insert")) {
            type = "insert";
            jsonObj.put("type", type);
        }

        //拼接key
        String key = tableName + ":" + type;

        //获取当前处理的数据对应的配置信息
        TableProcess tableProcess = broadcastState.get(key);

        if (tableProcess != null) {
            //在配置表中存在该条数据对应的配置信息
            String sinkType = tableProcess.getSinkType();
            //不管是事实数据还是维度数据，在向下游传递之前，都需要将输出目的地携带上
            String sinkTable = tableProcess.getSinkTable();
            jsonObj.put("sink_table", sinkTable);

            //对数据进行过滤
            String sinkColumns = tableProcess.getSinkColumns();
            JSONObject dataJsonObj = jsonObj.getJSONObject("data");
            filterColumn(sinkColumns,dataJsonObj);

            if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
                //事实数据 ---- 写到主流
                out.collect(jsonObj);
            } else if (TableProcess.SINK_TYPE_HBASE.equals(sinkType)) {
                //维度数据 ---- 写到维度侧输出流
                ctx.output(dimTag, jsonObj);
            }
        } else {
            //在配置表中没有找到该条数据对应的配置信息
            System.out.println("No this key in TableProcess : " + key);
        }
    }

    //处理广播流中的配置数据
    //jsonStr  {"database":"","table":"","type":"","data":{}}
    @Override
    public void processBroadcastElement(String jsonStr, Context ctx, Collector<JSONObject> out) throws Exception {
        //为了操作方便，将json字符串转换为json对象
        JSONObject jsonObj = JSON.parseObject(jsonStr);
        JSONObject dataJsonObj = jsonObj.getJSONObject("data");
        //将json对象转换为  配置实体类对象
        TableProcess tableProcess = dataJsonObj.toJavaObject(TableProcess.class);
        //获取业务数据库中表名
        String tableName = tableProcess.getSourceTable();
        //获取操作类型
        String type = tableProcess.getOperateType();
        //获取输出标记  事实还是维度   事实：kafka     维度：hbase
        String sinkType = tableProcess.getSinkType();
        //获取输出的目的地
        String sinkTable = tableProcess.getSinkTable();
        //获取输出的字段       作用1：创建维度表的时候 指定表中的字段        作用2：过滤字段
        String sinkColumns = tableProcess.getSinkColumns();
        //获取建表的主键
        String sinkPk = tableProcess.getSinkPk();
        //获取建表的扩展
        String sinkExtend = tableProcess.getSinkExtend();

        //拼接存在广播状态中的key
        String key = tableName + ":" + type;

        //如果当前读取到的配置信息是维度配置的话，提前将维度表创建出来
        if (TableProcess.SINK_TYPE_HBASE.equals(sinkType) && "insert".equals(type)) {
            checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
        }

        //获取广播状态
        BroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        //将读取的一条配置信息放到状态中保存
        broadcastState.put(key, tableProcess);
    }

    //创建维度表
    private void checkTable(String tableName, String columns, String pk, String ext) {
        if (pk == null) {
            pk = "id";
        }

        if (ext == null) {
            ext = "";
        }
        //拼接建表语句
        StringBuilder createSql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + tableName + "(");

        String[] columnArr = columns.split(",");
        for (int i = 0; i < columnArr.length; i++) {
            String column = columnArr[i];
            if (column.equals(pk)) {
                createSql.append(column + " varchar primary key ");
            } else {
                createSql.append(column + " varchar");
            }

            if (i < columnArr.length - 1) {
                createSql.append(",");
            }
        }
        createSql.append(") " + ext);
        System.out.println("在phoenix中的建表语句是:" + createSql);

        PreparedStatement ps = null;
        try {
            //创建数据库操作对象
            ps = conn.prepareStatement(createSql.toString());
            //执行SQl
            ps.execute();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("在Phoenix中建表失败" + createSql);
        } finally {
            //释放资源
            if (ps != null) {
                try {
                    ps.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 过滤字段
     * @param sinkColumns   id,tm_name
     * @param dataJsonObj   {"tm_name":"bb","logo_url":"bbb","id":13}
     */
    private void filterColumn(String sinkColumns, JSONObject dataJsonObj) {
        //为了判断json中的属性是否需要保留，我们将sinkColumns进行转换
        String[] fieldArr = sinkColumns.split(",");
        List<String> fieldList = Arrays.asList(fieldArr);

        Set<Map.Entry<String, Object>> entrySet = dataJsonObj.entrySet();

        entrySet.removeIf(entry->!fieldList.contains(entry.getKey()));
    }
}
